Vector Autoregression (VAR) from scratch (NumPy)#
Goals#
Model multivariate time series jointly (not one series at a time)
Understand and visualize cross-dependencies between variables
Understand stationarity/stability in multivariate systems
Select lag order
pwith information criteriaImplement VAR estimation + impulse response functions (IRFs) with NumPy
Visualize variable interactions and IRFs with Plotly
import sys
import numpy as np
import pandas as pd
import plotly
import plotly.graph_objects as go
import os
import plotly.io as pio
from plotly.subplots import make_subplots
pio.templates.default = "plotly_white"
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
np.set_printoptions(precision=4, suppress=True)
SEED = 42
rng = np.random.default_rng(SEED)
print("Python:", sys.version.split()[0])
print("NumPy:", np.__version__)
print("Pandas:", pd.__version__)
print("Plotly:", plotly.__version__)
Python: 3.12.9
NumPy: 1.26.2
Pandas: 2.1.3
Plotly: 6.5.2
1) Multivariate time series modeling (what VAR is)#
A multivariate time series observes multiple variables at each time step.
Let the observation at time
tbe a vector:
A VAR(p) model explains current values using
ppast lags of all variables:
where:
\(\mathbf{c} \in \mathbb{R}^k\) is an intercept vector
\(\mathbf{A}_\ell \in \mathbb{R}^{k\times k}\) are coefficient matrices (one per lag)
\(\mathbf{u}_t \in \mathbb{R}^k\) is a zero-mean innovation (shock) with covariance \(\Sigma_u\).
2) Cross-dependencies between variables#
The key difference from fitting k separate AR models is that VAR includes cross-lag terms.
For k = 2 and p = 1:
\(a_{12}\) is the effect of \(y_{t-1}\) on \(x_t\)
\(a_{21}\) is the effect of \(x_{t-1}\) on \(y_t\)
These off-diagonal terms let VAR capture feedback loops (e.g., policy ↔ economy, supply ↔ price, etc.).
3) Stationarity / stability in multivariate systems#
For univariate AR models, stationarity relates to roots of a characteristic polynomial. For VAR, the analogous concept is stability: shocks should not explode over time.
VAR(1)#
A sufficient and (under mild assumptions) standard condition for stability is:
where \(\rho(\cdot)\) is the spectral radius (largest absolute eigenvalue).
VAR(p) companion form#
Define the state vector:
and the companion matrix \(\mathbf{F} \in \mathbb{R}^{kp\times kp}\):
Then the system is stable/stationary if all eigenvalues of \(\mathbf{F}\) lie inside the unit circle.
def companion_matrix(A_list: list[np.ndarray]) -> np.ndarray:
"""Build the VAR(p) companion matrix F of shape (k*p, k*p)."""
if len(A_list) == 0:
raise ValueError("A_list must be non-empty")
k = A_list[0].shape[0]
p = len(A_list)
if any(A.shape != (k, k) for A in A_list):
raise ValueError("All A_i must have shape (k, k)")
F = np.zeros((k * p, k * p), dtype=float)
F[:k, : k * p] = np.concatenate(A_list, axis=1)
if p > 1:
F[k:, :-k] = np.eye(k * (p - 1))
return F
def check_stationarity(A_list: list[np.ndarray]) -> dict:
"""Return eigenvalues and a stationarity flag based on the companion matrix."""
F = companion_matrix(A_list)
eigvals = np.linalg.eigvals(F)
spectral_radius = float(np.max(np.abs(eigvals)))
return {
"is_stationary": spectral_radius < 1.0,
"spectral_radius": spectral_radius,
"eigvals": eigvals,
}
4) Low-level NumPy estimation (OLS)#
With observations \(\{\mathbf{y}_t\}_{t=1}^T\) and lag order p, define the stacked matrices:
Dependent block: $$ \mathbf{Y} =
\in \mathbb{R}^{(T-p)\times k} $$
Design matrix (constant + lags): $$ \mathbf{X} =
\in \mathbb{R}^{(T-p)\times (1+kp)} $$
Collect coefficients into: $\( \mathbf{B} = \begin{bmatrix} \mathbf{c} & \mathbf{A}_1 & \cdots & \mathbf{A}_p \end{bmatrix}^\top \in \mathbb{R}^{(1+kp)\times k} \)$
Then: $\( \mathbf{Y} = \mathbf{X}\mathbf{B} + \mathbf{U} \)$
and the OLS estimator is: $\( \hat{\mathbf{B}} = (\mathbf{X}^\top\mathbf{X})^{-1}\mathbf{X}^\top\mathbf{Y} \)$
We’ll compute it with np.linalg.lstsq (numerically safer than forming the inverse explicitly).
def build_var_matrices(y: np.ndarray, p: int, include_const: bool = True) -> tuple[np.ndarray, np.ndarray]:
"""Construct (X, Y) for VAR(p) with rows as time and columns as variables.
y: (T, k)
X: (T-p, (1 if include_const else 0) + k*p)
Y: (T-p, k)
"""
y = np.asarray(y, dtype=float)
if y.ndim != 2:
raise ValueError("y must have shape (T, k)")
if p < 1:
raise ValueError("p must be >= 1")
n_steps, k = y.shape
if n_steps <= p:
raise ValueError("Need more observations than p")
Y = y[p:, :]
X_parts = []
if include_const:
X_parts.append(np.ones((n_steps - p, 1)))
for lag in range(1, p + 1):
X_parts.append(y[p - lag : n_steps - lag, :])
X = np.concatenate(X_parts, axis=1)
return X, Y
def fit_var_ols(y: np.ndarray, p: int, include_const: bool = True) -> dict:
"""Fit VAR(p) with equation-by-equation OLS (equivalently multivariate least squares)."""
X, Y = build_var_matrices(y=y, p=p, include_const=include_const)
beta_hat, *_ = np.linalg.lstsq(X, Y, rcond=None) # (n_features, k)
Y_hat = X @ beta_hat
residuals = Y - Y_hat
effective_T = Y.shape[0]
Sigma_u = (residuals.T @ residuals) / effective_T
k = Y.shape[1]
start = 0
if include_const:
intercept = beta_hat[0, :]
start = 1
else:
intercept = np.zeros(k)
A_list = []
for lag_index in range(p):
block = beta_hat[start + lag_index * k : start + (lag_index + 1) * k, :] # (k, k)
A_list.append(block.T)
return {
"p": p,
"k": k,
"include_const": include_const,
"beta": beta_hat,
"intercept": intercept,
"A": A_list,
"residuals": residuals,
"Sigma_u": Sigma_u,
"X": X,
"Y": Y,
}
5) Lag selection (choosing p)#
Increasing p makes the model more flexible, but it increases parameters quickly:
With
kvariables, each lag addsk×kcoefficients.With
plags, coefficients arek^2 p(pluskintercepts).
A common approach is to fit VAR models for p = 1..p_max and pick the lag that minimizes an information criterion.
Using the residual covariance estimate \(\hat{\Sigma}_u(p)\) and parameter count \(m(p) = k^2 p + k\) (if intercept included):
where \(T_p = T - p\) is the number of usable rows after lagging.
def _slogdet_psd(matrix: np.ndarray, jitter: float = 1e-10, max_tries: int = 6) -> float:
"""Stable log(det(.)) for covariance-like matrices, with diagonal jitter if needed."""
matrix = np.asarray(matrix, dtype=float)
if matrix.shape[0] != matrix.shape[1]:
raise ValueError("matrix must be square")
jitter_value = float(jitter)
for _ in range(max_tries):
sign, logdet = np.linalg.slogdet(matrix)
if sign > 0:
return float(logdet)
matrix = matrix + jitter_value * np.eye(matrix.shape[0])
jitter_value *= 10.0
raise np.linalg.LinAlgError("slogdet failed: matrix not positive definite after jitter")
def information_criteria_var(y: np.ndarray, p: int, include_const: bool = True) -> dict:
model = fit_var_ols(y=y, p=p, include_const=include_const)
k = model["k"]
T_p = model["Y"].shape[0]
n_params = (k * k * p) + (k if include_const else 0)
logdet = _slogdet_psd(model["Sigma_u"])
aic = logdet + 2.0 * n_params / T_p
bic = logdet + np.log(T_p) * n_params / T_p
return {
"p": p,
"aic": float(aic),
"bic": float(bic),
"n_params": int(n_params),
"T_p": int(T_p),
}
def select_lag(y: np.ndarray, p_max: int, include_const: bool = True) -> tuple[pd.DataFrame, dict]:
records = [information_criteria_var(y=y, p=p, include_const=include_const) for p in range(1, p_max + 1)]
table = pd.DataFrame.from_records(records).set_index("p").sort_index()
best = {
"aic": int(table["aic"].idxmin()),
"bic": int(table["bic"].idxmin()),
}
return table, best
6) Intuition: an interacting system (simulate a stable VAR)#
Think of a VAR as a simple discrete-time dynamical system:
Each variable responds to its own past (inertia / momentum)
Each variable can respond to other variables’ past (coupling / interaction)
We’ll simulate a 3-variable VAR(2) with cross-effects, then:
choose a lag order
fit the coefficients
visualize interactions and IRFs.
def cholesky_with_jitter(matrix: np.ndarray, jitter: float = 1e-10, max_tries: int = 6) -> np.ndarray:
matrix = np.asarray(matrix, dtype=float)
jitter_value = float(jitter)
for _ in range(max_tries):
try:
return np.linalg.cholesky(matrix)
except np.linalg.LinAlgError:
matrix = matrix + jitter_value * np.eye(matrix.shape[0])
jitter_value *= 10.0
raise
def simulate_var(
A_list: list[np.ndarray],
Sigma_u: np.ndarray,
n_steps: int,
*,
intercept: np.ndarray | None = None,
burn_in: int = 200,
rng: np.random.Generator | None = None,
) -> np.ndarray:
"""Simulate y_t = c + sum_i A_i y_{t-i} + u_t.
Returns y with shape (n_steps, k).
"""
if rng is None:
rng = np.random.default_rng(0)
k = A_list[0].shape[0]
p = len(A_list)
if intercept is None:
intercept = np.zeros(k)
chol = cholesky_with_jitter(Sigma_u)
y = np.zeros((n_steps + burn_in + p, k), dtype=float)
for t in range(p, y.shape[0]):
ar_part = np.zeros(k)
for lag_index, A in enumerate(A_list, start=1):
ar_part = ar_part + A @ y[t - lag_index]
shock = chol @ rng.standard_normal(k)
y[t] = intercept + ar_part + shock
return y[burn_in + p :, :]
names = ["Output", "Inflation", "PolicyRate"]
# A stable VAR(2) with cross-dependencies (off-diagonals)
A1_true = np.array(
[
[0.55, 0.10, -0.15],
[0.15, 0.45, 0.05],
[-0.20, 0.25, 0.40],
]
)
A2_true = np.array(
[
[-0.15, 0.00, 0.00],
[0.00, -0.10, 0.00],
[0.00, 0.00, -0.10],
]
)
Sigma_u_true = np.array(
[
[1.00, 0.30, 0.15],
[0.30, 1.00, 0.10],
[0.15, 0.10, 1.00],
]
)
stationarity_true = check_stationarity([A1_true, A2_true])
stationarity_true
{'is_stationary': True,
'spectral_radius': 0.4152628017403557,
'eigvals': array([0.0926+0.3248j, 0.0926-0.3248j, 0.3683+0.1919j, 0.3683-0.1919j,
0.2392+0.1381j, 0.2392-0.1381j])}
y = simulate_var([A1_true, A2_true], Sigma_u=Sigma_u_true, n_steps=800, rng=rng)
df = pd.DataFrame(y, columns=names)
df.head()
| Output | Inflation | PolicyRate | |
|---|---|---|---|
| 0 | 1.196020 | -0.288455 | 0.672731 |
| 1 | -0.200209 | -0.739549 | 0.524796 |
| 2 | 0.320734 | 0.348595 | -1.525616 |
| 3 | 1.008172 | -0.622605 | -0.386434 |
| 4 | -0.921646 | -0.184484 | -1.343241 |
fig = go.Figure()
for name in names:
fig.add_trace(go.Scatter(x=df.index, y=df[name], mode="lines", name=name))
fig.update_layout(
title="Simulated interacting system (VAR(2))",
xaxis_title="t",
yaxis_title="value",
legend_title="variable",
height=350,
)
fig.show()
ic_table, best = select_lag(df.values, p_max=6, include_const=True)
ic_table
| aic | bic | n_params | T_p | |
|---|---|---|---|---|
| p | ||||
| 1 | 0.059707 | 0.130046 | 12 | 799 |
| 2 | -0.004962 | 0.118251 | 21 | 798 |
| 3 | 0.005384 | 0.181576 | 30 | 797 |
| 4 | 0.008751 | 0.238028 | 39 | 796 |
| 5 | 0.017841 | 0.300307 | 48 | 795 |
| 6 | 0.039101 | 0.374861 | 57 | 794 |
best
{'aic': 2, 'bic': 2}
p_hat = best["bic"]
model = fit_var_ols(df.values, p=p_hat, include_const=True)
stationarity_hat = check_stationarity(model["A"])
{k: stationarity_hat[k] for k in ["is_stationary", "spectral_radius"]}
{'is_stationary': True, 'spectral_radius': 0.4896672335687779}
def plot_var_coefficients(A_list: list[np.ndarray], names: list[str]) -> go.Figure:
k = A_list[0].shape[0]
p = len(A_list)
zmax = max(float(np.max(np.abs(A))) for A in A_list)
fig = make_subplots(
rows=1,
cols=p,
shared_yaxes=True,
subplot_titles=[f"Lag {i}" for i in range(1, p + 1)],
)
for col, A in enumerate(A_list, start=1):
fig.add_trace(
go.Heatmap(
z=A,
x=names,
y=names,
colorscale="RdBu",
zmin=-zmax,
zmax=zmax,
showscale=col == 1,
colorbar=dict(title="coef"),
),
row=1,
col=col,
)
fig.update_layout(
title="Estimated variable interactions (A matrices)",
height=360,
width=320 * p,
)
fig.update_xaxes(title_text="Source (lagged variable)")
fig.update_yaxes(title_text="Target (current variable)")
return fig
plot_var_coefficients(model["A"], names).show()
7) Impulse response functions (IRFs)#
IRFs answer: “If I shock variable j today, how do variables respond over future horizons?”
A VAR has a moving-average representation:
where each \(\Psi_h \in \mathbb{R}^{k\times k}\) maps a shock at horizon h to responses.
For stable VARs, \(\Psi_h \to 0\) as \(h \to \infty\).
If shocks are correlated (\(\Sigma_u\) not diagonal), IRFs are often orthogonalized via a Cholesky factorization of \(\Sigma_u\). This introduces an ordering assumption (the variable order matters).
def irf_matrices(A_list: list[np.ndarray], horizons: int) -> np.ndarray:
"""Return Psi_h for h=0..H with shape (H+1, k, k)."""
k = A_list[0].shape[0]
p = len(A_list)
F = companion_matrix(A_list)
Psi = np.empty((horizons + 1, k, k), dtype=float)
F_power = np.eye(k * p)
for h in range(horizons + 1):
Psi[h] = F_power[:k, :k]
F_power = F_power @ F
return Psi
def orthogonalize_irf(Psi: np.ndarray, Sigma_u: np.ndarray) -> np.ndarray:
chol = cholesky_with_jitter(Sigma_u)
return Psi @ chol
def plot_irf_grid(Psi: np.ndarray, names: list[str], *, title: str) -> go.Figure:
horizons = Psi.shape[0] - 1
k = Psi.shape[1]
h = np.arange(horizons + 1)
subplot_titles = [f"{names[i]} ← {names[j]}" for i in range(k) for j in range(k)]
fig = make_subplots(rows=k, cols=k, shared_xaxes=True, subplot_titles=subplot_titles)
for i in range(k):
for j in range(k):
fig.add_trace(
go.Scatter(
x=h,
y=Psi[:, i, j],
mode="lines",
line=dict(width=2),
showlegend=False,
),
row=i + 1,
col=j + 1,
)
fig.add_trace(
go.Scatter(
x=h,
y=np.zeros_like(h),
mode="lines",
line=dict(color="black", width=1, dash="dot"),
showlegend=False,
),
row=i + 1,
col=j + 1,
)
fig.update_layout(title=title, height=240 * k, width=240 * k)
for row in range(1, k + 1):
fig.update_yaxes(title_text="response", row=row, col=1)
for col in range(1, k + 1):
fig.update_xaxes(title_text="horizon", row=k, col=col)
return fig
H = 20
Psi = irf_matrices(model["A"], horizons=H)
Psi_orth = orthogonalize_irf(Psi, Sigma_u=model["Sigma_u"])
plot_irf_grid(Psi_orth, names=names, title="Orthogonalized IRFs (Cholesky, order = column order)").show()
Takeaways#
VAR models multiple time series jointly and captures cross-dependencies via off-diagonal coefficients in \(\mathbf{A}_\ell\).
Multivariate stationarity is a stability condition: eigenvalues of the companion matrix must be inside the unit circle.
Lag selection trades bias/variance; AIC often prefers larger models, BIC is more conservative.
IRFs make dynamics interpretable; orthogonalization requires an identification assumption (ordering matters).